Workflowsが多重起動されないようにする方法をCloud Run Functionsを用いる方法で考えてみた
はじめに
データパイプライン構築などでWorkflowsを用いている場合は同じワークフローを多重起動することを避けたい場合があると思います。多重起動を防止するためにはFirestoreなど何らかのDBに記録しておく方法がよくあると思いますが、
ワークロードによっては使用するリソースを増やしたくない時もあると思います。
以前のブログではWorkflowsのWorkflowsコネクタを用いてWorkflowsのワークフロー内で多重起動する方法を考えて試してみました。
しかしながら上記の方法ではミリ秒の差で起動された場合は正しく重複起動判定ができませんでした。
そこで今回はミリ秒の差で起動された場合でも正しく重複起動判定ができるような実装を考えてみました。
概要
指定したワークフローの実行結果一覧を取得できるprojects.locations.workflows.executions.list
APIを用います。
ListAPIの概要
ListAPIは引数に指定した名前のワークフローの実行履歴を返却します。また返却される値は実行時間の降順(新しい順)となっています。
以下が代表的な引数です。
引数 | 説明 |
---|---|
parent | 必須。projects/プロジェクトID/locations/リージョン/workflows/ワークフロー名 の形式で指定する |
pageSize | 返却される実行履歴の最大数。pageSizeを設定しなかった場合は100。view に設定した値で設定できる最大値は異なる |
pageToken | ページネーションを行う場合はpageTokenを用いる |
view | 返却されるフィールド値を制御する引数。EXECUTION_VIEW_UNSPECIFIED ・BASIC ・FULL から選択。省略した場合はBASIC がデフォルト値。BASIC の場合はname ・start_time 、end_time 、state 、workflow_revision_id が返却される。 |
代表的なレスポンスパラメータを以下に示します。
パラメータ | 説明 |
---|---|
name | 実行履歴名。projects/プロジェクトID/locations/リージョン/workflows/ワークフロー名/executions/ワークフロー実行ID の形式。ワークフロー実行IDとほぼ同義 |
state | 実行状態。今回の肝なので返却値のパターンは別表 |
startTime | ワークフロー実行開始時間 |
workflowRevisionId | 実行されたワークフローのリビジョン |
上表の実行状態(state
)が返却する値は以下です。
state | 説明 |
---|---|
STATE_UNSPECIFIED | 無効な状態 |
ACTIVE | ワークフロー実行中 |
SUCCEEDED | ワークフローが実行に成功して終了 |
FAILED | ワークフローの実行に失敗 |
CANCELLED | ワークフローの実行がキャンセル |
どうやって重複起動か判断するか
ListAPIで取得した実行結果一覧のなかで実行中のワークフローの開始時間(startTime
)を重複起動判定時の条件に加えます。
判定対象のワークフロー実行が、ACTIVEなワークフロー実行かつ最も古い開始時間の実行ではない場合
は重複起動と判断し、ワークフローを実行しないようにすればよいと考えました。
言い換えるとACTIVEなワークフローで最も古い開始時間の場合
は実行を継続して問題ない、と判断します。
開始時間をもとに判定をしたいので、pythonで判定処理を実装してCloud Run FunctionsにデプロイしてWorkflowsからはHTTPリクエストで呼び出すようにしました。
Workflows(YAML上)で開始時間の比較を行うことが難しいためです。
判定用スクリプト
何はともあれスクリプト全文です。今回はWorkflowsのpythonクライアントライブラリを使用しました。
import functions_framework
from google.cloud.workflows.executions_v1 import ExecutionsClient
from google.cloud.workflows.executions_v1.types import ListExecutionsRequest, Execution
from flask import jsonify, abort
def can_execute_workflow(project_num: str, location: str, workflow_name: str, current_execution_name: str):
"""
指定されたワークフローの多重起動判定関数
現在の実行がACTIVE状態の中で最も開始時間が古いかどうかを判定
Args:
project_num (str): プロジェクト番号
location (str): ワークフローがデプロイされているリージョン
workflow_name (str): ワークフローの名前
current_execution_name (str): 現在の実行名
Returns:
dict: 実行可否と関連情報を含むレスポンス
- "can_execute" (bool): 実行可能であればTrue
- "message" (str): 実行可否の理由
- "latest_active_execution" (dict or None): 現在のACTIVEな実行の中で最も古い実行の情報
"""
client = ExecutionsClient()
parent = f"projects/{project_num}/locations/{location}/workflows/{workflow_name}"
try:
# 実行リストを取得
request = ListExecutionsRequest(parent=parent)
response = client.list_executions(request=request)
# ACTIVE状態の実行をフィルタリング
active_executions = [
execution for execution in response.executions
if execution.state == Execution.State.ACTIVE # Enumの値と比較
]
# ACTIVE状態の実行がない場合実行可能
if not active_executions:
return {
"can_execute": True,
"message": "No ACTIVE executions found. Safe to execute.",
"latest_active_execution": None,
}
# 最も開始時間が古いACTIVEな実行を取得
oldest_execution = min(
active_executions,
key=lambda execution: execution.start_time
)
# 現在の実行が最も古いかどうか判定
if oldest_execution.name == current_execution_name:
return {
"can_execute": True,
"message": "Current execution is the oldest ACTIVE execution. Safe to execute.",
"latest_active_execution": {
"name": oldest_execution.name,
"state": Execution.State(oldest_execution.state).name,
"start_time": oldest_execution.start_time,
},
}
else:
return {
"can_execute": False,
"message": "Current execution is not the oldest ACTIVE execution. Do not execute.",
"latest_active_execution": {
"name": oldest_execution.name,
"state": Execution.State(oldest_execution.state).name,
"start_time": oldest_execution.start_time,
},
}
except Exception as e:
return {
"can_execute": False,
"message": f"Error occurred while checking executions: {e}",
"latest_active_execution": None,
}
# Cloud Functions HTTPエントリポイント
@functions_framework.http
def check_workflows_run(request):
"""
Cloud FunctionsのHTTPエントリポイント
JSONリクエストを受け取り、ワークフローの実行可否を判定
Args:
request (flask.Request): HTTPリクエストオブジェクト
Returns:
flask.Response: 実行可否の結果を含むJSONレスポンス
"""
try:
# JSONデータを取得
data = request.get_json()
if not data:
abort(400, description="Invalid JSON payload")
# 必須フィールドのチェック
required_fields = ["project_num", "location", "workflow_name", "execution_id"]
for field in required_fields:
if field not in data:
abort(400, description=f"Missing required field: {field}")
# 必要な情報を取得
project_num = data['project_num']
location = data['location']
workflow_name = data['workflow_name']
execution_id = data['execution_id']
current_execution_name = f"projects/{project_num}/locations/{location}/workflows/{workflow_name}/executions/{execution_id}"
# 実行可否を判定
result = can_execute_workflow(project_num, location, workflow_name, current_execution_name)
# 実行可能の場合は200を返す
if result["can_execute"]:
return jsonify(result), 200
# 実行不可の場合は503を返す
return jsonify(result), 503
except Exception as e:
return jsonify({"error": f"Unexpected error: {e}"}), 500
処理の概要を以下に解説します。
can_execute_workflow
関数
ワークフローの実行リストを取得し、現在の実行が「最も古いACTIVE
状態の実行」であるかどうか判定
引数
project_num
(str): プロジェクト番号location
(str): ワークフローがデプロイされているリージョンworkflow_name
(str): ワークフローの名前current_execution_name
(str): 現在の実行名(APIリクエストに必要な形式で表現)
処理フロー
-
Workflows API の呼び出し:
ExecutionsClient
を使用して、指定されたワークフローの実行リストを取得
-
ACTIVE
状態の実行をフィルタリング:- 実行リストの中から、
Execution.State.ACTIVE
の状態を持つものだけを抽出(=実行状態がACTIVE)
- 実行リストの中から、
-
ACTIVE
実行が存在しない場合:- 判定対象ワークフロー実行が実行可能であると判断し、
can_execute=True
を返却
- 判定対象ワークフロー実行が実行可能であると判断し、
-
最も古い
ACTIVE
実行の判定:start_time
を基準に、最も古いACTIVE
実行を取得
-
現在の実行が最も古いか判定:
- 現在の実行が最も古い場合は
can_execute=True
を返却(実行OK)、それ以外の場合はcan_execute=False
(実行NG)を返却
- 現在の実行が最も古い場合は
戻り値
can_execute
(bool): 実行可能であればTrue
、それ以外はFalse
message
(str): 実行可否の判定結果latest_active_execution
(dict or None): 最も古いACTIVE
な実行の情報(名前、状態、開始時刻)
check_workflows_run
関数
Cloud Run FunctionsのHTTPエントリポイント。リクエストを受け取り、can_execute_workflow
関数を呼び出して結果を返却する
処理フロー
-
リクエストの検証:
- リクエストボディが有効な JSON 形式であるかを確認
- 必須フィールド(
project_num
、location
、workflow_name
、execution_id
)が存在するかをチェック
-
実行可否の判定:
can_execute_workflow
関数を呼び出し、実行可否を判定
-
レスポンスの生成:
- 実行可能な場合は、
200 OK
を返却 - 実行不可の場合は、
503 Service Unavailable
を返却 - 処理中にエラーが発生した場合は、
500 Internal Server Error
を返却
- 実行可能な場合は、
requirements.txt
は以下です。
functions-framework==3.*
google-cloud-workflows
テスト用のWorkflows
- init:
assign:
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- project_num: ${sys.get_env("GOOGLE_CLOUD_PROJECT_NUMBER")}
- workflow_exec_id: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_EXECUTION_ID")}
- workflow_name: ${sys.get_env("GOOGLE_CLOUD_WORKFLOW_ID")}
- workflow_location: ${sys.get_env("GOOGLE_CLOUD_LOCATION")}
- functions_url: "作成したCloud Run FunctionsのURL"
- checkRunsStep:
call: http.post
args:
url: ${functions_url}
auth:
type: OIDC
body:
project_num: ${project_num}
location: ${workflow_location}
workflow_name: ${workflow_name}
execution_id: ${workflow_exec_id}
timeout: 500
result: processResult
- sleepStep:
call: sys.sleep
args:
seconds: 30
- endStep:
return: ${processResult}
プロジェクト番号、リージョン、ワークフロー名、ワークフローの実行IDを引数にCloud Run Functions関数を呼び出します。
チェック結果でOKとなれば実行が継続しますし、NG(多重起動)と判定された場合は503エラーが返ってくるのでワークフロー実行は例外発生で終了します。
※30秒sleep処理を入れていますが、これは実装時にテストのため使っていました。30秒のsleep処理の間に手で再度ワークフローを再実行して多重起動判定されるかどうかをみたりなど・・・不要なので消しても大丈夫です。
テスト用のシェルスクリプト
ミリ秒レベルでワークフロー実行したいのでシェルで並列起動するようにしてみました。
#!/bin/bash
# ワークフロー名とリージョンを設定
WORKFLOW_NAME="ワークフロー名"
LOCATION="asia-northeast1"
# 5回並列実行
for i in {1..5}; do
echo "Executing workflow $i..."
gcloud workflows execute "$WORKFLOW_NAME" --location="$LOCATION" &
done
# 全てのバックグラウンドプロセスが完了するのを待つ
wait
echo "All executions completed."
これを実行すればミリ秒レベルでワークフローが実行できます(端末のスペックにもよります・・・)
いざ、テストしてみる
シェルスクリプトを実行します。
sh xxx.sh
というような感じで。
ワクワクしながら結果を見てみます。
gcloud workflows executions list "ワークフロー名" --location=asia-northeast1
5件の結果が返ってきています。観察します。
Execution ID | State | Start Time | End Time |
---|---|---|---|
bdbf9d03-a0a2-4afe-9673-da596e618841 | FAILED | 2025-01-13T13:56:46.039515471Z | 2025-01-13T13:56:48.640074167Z |
1a5cd146-2d41-4a6e-9b5c-2a99f392956e | FAILED | 2025-01-13T13:56:46.039425206Z | 2025-01-13T13:56:48.518246471Z |
cc3f1cd3-2300-465f-bd4f-dcaf5886fe0c | FAILED | 2025-01-13T13:56:46.039418308Z | 2025-01-13T13:56:48.665984647Z |
61035a2a-dbec-49d5-a6fc-63c4fe4c5573 | FAILED | 2025-01-13T13:56:46.030504920Z | 2025-01-13T13:56:48.524949182Z |
44812407-c4ac-4a08-a4c0-e79e50c3389a | SUCCEEDED | 2025-01-13T13:56:46.028397865Z | 2025-01-13T13:57:08.549466843Z |
1つのワークフロー実行だけSUCCEEDED
となっており実行に成功しています。また、各ワークフロー実行のStart Time(開始時間)
を見るとミリ秒レベルでの起動となっているのが確認できます。
ミリ秒レベルで同時に起動されても、多重起動判定を行い1つの処理だけ実行することができている様子が上記の結果よりわかります。いやーよかった。
注意
この実装では、ミリ秒レベルの多重起動にも対応できると考えています。が、同じワークフローを呼び出し引数を変えて実行する場合は対応できません。
正しい呼び出しも多重起動扱いしてしまいます。
このような場合はList APIを用いた判定だけでは難しいと考えます・・・
この場合はFirestoreなど外部の記録媒体に呼び出し引数などをセットで記録してそれぞれのグループごとに多重起動判定をする必要があると考えます。
おわりに
今回実装したような多重起動判定用のCloud Run Functions関数をWorkflowsのワークフローの最初の方のステップに実装し判定することでワークフローの多重起動を防ぐことが可能になると思われます。
他にもいろいろな方法があると思いますし、今回の方法もベストとは言い難い方法とも思えています。そしてもっといろいろな方法を探ってみようと思っています。
それではまた、ナマステー
参考